package codemperor.mapreduce.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;

import java.io.IOException;

public class MessageEtl {
    public static Logger log = Logger.getLogger(MessageEtl.class);

    public static void main(String[] args) throws Exception {
        String HDFSURI = "hdfs://master:9000";
        /**********配置********/
        Configuration configuration = new Configuration();
        //这里针对外网链接测试，设置datanode的hostname通信
        configuration.set("dfs.client.use.datanode.hostname", "true");

        log.info("go go go");

        //编写作业
        Job job = Job.getInstance(configuration, "MessageEtl");
        job.setJarByClass(MessageEtl.class);
        job.setMapperClass(MessageEtlMapper.class);

        //设置map的输出类型
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        //作业输入和输出的参数
        FileInputFormat.setInputPaths(job, new Path("/Users/zhang.lu/Desktop/dingMessage.csv"));
        FileOutputFormat.setOutputPath(job, new Path("/Users/zhang.lu/Desktop/mess"));

        boolean result = job.waitForCompletion(true);
        System.out.println(result);
    }

    public static class MessageEtlMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split(",");
            String sender = values[values.length - 3];
            String msgType = values[3];
            String time = values[values.length - 1];
            log.info(sender + "," + time + "," + msgType);
            context.write(NullWritable.get(), new Text(sender + "," + time + "," + msgType));
        }
    }
}
